Rxjava操作符:第3篇 Flitering Observable

本文基于Rxjava 2.x版本,介绍用于 Observable 过滤发射项目的操作符。

Operators that selectively emit items from a source Observable.

  • Debounce — only emit an item from an Observable if a particular timespan has passed without it emitting another item

    如果特定的时间跨度已经过去而没有发出另一个项目,则只从Observable中发出一个项目

  • Distinct — suppress duplicate items emitted by an Observable

    过滤重复发射源

  • ElementAt — emit only item n emitted by an Observable

    仅发出方法中指定的发射源

  • Filter — emit only those items from an Observable that pass a predicate test

    仅发出符合过滤条件的发射源

  • First — emit only the first item, or the first item that meets a condition, from an Observable

    仅保留发射序列的第一个发射源

  • IgnoreElements — do not emit any items from an Observable but mirror its termination notification

    忽略发射序列的所有元素,只会响应 onComplete 方法

  • Last — emit only the last item emitted by an Observable

    仅保留发射序列的最后一个发射源

  • Sample — emit the most recent item emitted by an Observable within periodic time intervals

    取每个时间周期内的最后一个发射源。

  • Skip — suppress the first n items emitted by an Observable

    跳过起始位置的 n 个发射源

  • SkipLast — suppress the last n items emitted by an Observable

    跳过终点位置的 n 个发射源

  • Take — emit only the first n items emitted by an Observable

    保留起始位置的 n 个发射源

  • TakeLast — emit only the last n items emitted by an Observable

    保留终点位置的 n 个发射源

debounce 操作符

当发射源 Emitter#onNext 方法发射间隔小于 debounce 方法指定间隔,取时间间隔有交集的最后一个发射源。在当前指定时间周期内只有一个发射源的时候,直接发出该发射源,如果指定时间周期出现多个发射源,取周期内最后一个发射源发出,其他的发射源过滤掉。根据下面的 debounce 图解可以得知,debounce 周期随发射源发出开始计算。注意和 sample 操作符区别

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// Diagram:
// -A--------------B----C-D-------------------E-|---->
// a---------1s
// b---------1s
// c---------1s
// d---------1s
// e-|---->
// -----------A---------------------D-----------E-|-->

Observable<String> source = Observable.create(emitter -> {
emitter.onNext("A");

Thread.sleep(1_500);
emitter.onNext("B");

Thread.sleep(500);
emitter.onNext("C");

Thread.sleep(250);
emitter.onNext("D");

Thread.sleep(2_000);
emitter.onNext("E");
emitter.onComplete();
});

source.subscribeOn(Schedulers.io())
.debounce(1, TimeUnit.SECONDS)
.blockingSubscribe(
item -> System.out.println("onNext: " + item),
Throwable::printStackTrace,
() -> System.out.println("onComplete"));

// prints:
// onNext: A
// onNext: D
// onNext: E
// onComplete

distinct 操作符

过滤重复的发射项目

1
2
3
4
5
6
7
8
9
Observable.just(2, 3, 4, 4, 2, 1)
.distinct()
.subscribe(System.out::println);

// prints:
// 2
// 3
// 4
// 1

elementAt 操作符

获取指定发射序列上某条数据

1
2
3
4
5
6
7
8
9
10
Observable<Long> source = Observable.<Long, Long>generate(() -> 1L, (state, emitter) -> {
emitter.onNext(state);

return state + 1L;
}).scan((product, x) -> product * x);

Maybe<Long> element = source.elementAt(5);
element.subscribe(System.out::println);

// prints 720

filter 操作符

过滤非函数判断指定的发射序列

1
2
3
4
5
6
7
8
Observable.just(1, 2, 3, 4, 5, 6)
.filter(x -> x % 2 == 0)
.subscribe(System.out::println);

// prints:
// 2
// 4
// 6

first 操作符

取发射序列的第一个发射源,和 last 操作符对应。first 方法中参数为指定默认值。

1
2
3
4
5
6
Observable<String> source = Observable.just("A", "B", "C");
Single<String> firstOrDefault = source.first("D");

firstOrDefault.subscribe(System.out::println);

// prints A

ignoreElement 操作符

忽略被观察者发射序列携带的数据,返回值为 Completable<T> ,只调用 onComplete 方法

1
2
3
4
5
6
7
8
Observable<Long> source = Observable.intervalRange(1, 5, 1, 1, TimeUnit.SECONDS);
Completable completable = source.ignoreElements();

completable.doOnComplete(() -> System.out.println("Done!"))
.blockingAwait();

// prints (after 5 seconds):
// Done!

last 操作符

去发射序列最后一个发射源,last 方法参数为默认值

1
2
3
4
5
6
Observable<String> source = Observable.just("A", "B", "C");
Single<String> lastOrDefault = source.last("D");

lastOrDefault.subscribe(System.out::println);

// prints C

sample 操作符

从指定的时间周期中取最后一个发射源。与 debounce 操作符的区别是,sample 操作符的时间周期是独立的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// Diagram:
// -A----B-C-------D-----E-|-->
// -0s-----c--1s---d----2s-|-->
// -----------C---------D--|-->

Observable<String> source = Observable.create(emitter -> {
emitter.onNext("A");

Thread.sleep(500);
emitter.onNext("B");

Thread.sleep(200);
emitter.onNext("C");

Thread.sleep(800);
emitter.onNext("D");

Thread.sleep(600);
emitter.onNext("E");
emitter.onComplete();
});

source.subscribeOn(Schedulers.io())
.sample(1, TimeUnit.SECONDS)
.blockingSubscribe(
item -> System.out.println("onNext: " + item),
Throwable::printStackTrace,
() -> System.out.println("onComplete"));

// prints:
// onNext: C
// onNext: D
// onComplete

skip 操作符

指定从起始位置跳过 n 个发射源后继续发射

1
2
3
4
5
6
7
8
9
10
11
12
Observable<Integer> source = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

source.skip(4)
.subscribe(System.out::println);

// prints:
// 5
// 6
// 7
// 8
// 9
// 10

skipLast 操作符

指定跳过终点位置的 n 个发射源

1
2
3
4
5
6
7
8
9
10
11
12
Observable<Integer> source = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

source.skipLast(4)
.subscribe(System.out::println);

// prints:
// 1
// 2
// 3
// 4
// 5
// 6

take 操作符

和 skip 操作符相反操作,指定保留从起始位置发射的 n 个发射源

1
2
3
4
5
6
7
8
9
10
Observable<Integer> source = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

source.take(4)
.subscribe(System.out::println);

// prints:
// 1
// 2
// 3
// 4

takeLast 操作符

和 skipLast操作符相反,保留从终点位置发射的 n 个发射源

1
2
3
4
5
6
7
8
9
10
Observable<Integer> source = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

source.takeLast(4)
.subscribe(System.out::println);

// prints:
// 7
// 8
// 9
// 10

参考文章:

https://github.com/ReactiveX/RxJava/wiki/Filtering-Observables

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×